[][src]Crate rumqttc

A pure rust MQTT client which strives to be robust, efficient and easy to use. This library is backed by an async (tokio) eventloop which handles all the robustness and and efficiency parts of MQTT but naturally fits into both sync and async worlds as we'll see

Let's jump into examples right away

A simple synchronous publish and subscribe

use rumqttc::{MqttOptions, Client, QoS};
use std::time::Duration;
use std::thread;

let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(5);

let (mut client, mut connection) = Client::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
thread::spawn(move || for i in 0..10 {
   client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).unwrap();
   thread::sleep(Duration::from_millis(100));
});

// Iterate to poll the eventloop for connection progress
for (i, notification) in connection.iter().enumerate() {
    println!("Notification = {:?}", notification);
}

A simple asynchronous publish and subscribe

use rumqttc::{MqttOptions, AsyncClient, QoS};
use tokio::{task, time};
use std::time::Duration;
use std::error::Error;

let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(5);

let (mut client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).await.unwrap();

task::spawn(async move {
    for i in 0..10 {
        client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).await.unwrap();
        time::sleep(Duration::from_millis(100)).await;
    }
});

loop {
    let notification = eventloop.poll().await.unwrap();
    println!("Received = {:?}", notification);
}

Quick overview of features

  • Eventloop orchestrates outgoing/incoming packets concurrently and hadles the state
  • Pings the broker when necessary and detects client side half open connections as well
  • Throttling of outgoing packets (todo)
  • Queue size based flow control on outgoing packets
  • Automatic reconnections by just continuing the eventloop.poll()/connection.iter() loop`
  • Natural backpressure to client APIs during bad network
  • Immediate cancellation with client.cancel()

In short, everything necessary to maintain a robust connection

Since the eventloop is externally polled (with iter()/poll() in a loop) out side the library and Eventloop is accessible, users can

  • Distribute incoming messages based on topics
  • Stop it when required
  • Access internal state for use cases like graceful shutdown or to modify options before reconnection

Important notes

  • Looping on connection.iter()/eventloop.poll() is necessary to run the event loop and make progress. It yields incoming and outgoing activity notifications which allows customization as you see fit.

  • Blocking inside the connection.iter()/eventloop.poll() loop will block connection progress.

FAQ

Connecting to a broker using raw ip doesn't work

You cannot create a TLS connection to a bare IP address with a self-signed certificate. This is a limitation of rustls. One workaround, which only works under *nix/BSD-like systems, is to add an entry to wherever your DNS resolver looks (e.g. /etc/hosts) for the bare IP address and use that name in your code.

Modules

v4
v5

Structs

AsyncClient

AsyncClient to communicate with MQTT Eventloop This is cloneable and can be used to asynchronously Publish, Subscribe.

Client

Client to communicate with MQTT eventloop Connection.

ClientConfig

Common configuration for (typically) all connections made by a program.

ConnAck

Acknowledgement to connect packet

Connect

Connection packet initiated by the client

Connection

MQTT connection. Maintains all the necessary state

Disconnect
EventLoop

Eventloop with all the state of a connection

FixedHeader

Packet type from a byte

LastWill

LastWill that broker forwards on behalf of the client

Login
MqttOptions

Options to configure the behaviour of mqtt connection

MqttState

State of the mqtt connection.

PingReq
PingResp
PubAck

Acknowledgement to QoS1 publish

PubComp

Acknowledgement to QoS1 publish

PubRec

Acknowledgement to QoS1 publish

PubRel

Acknowledgement to QoS1 publish

Publish

Publish packet

SendError

An error returned from Sender::send().

Sender

The sending side of a channel.

SubAck

Acknowledgement to subscribe

Subscribe

Subscription packet

SubscribeFilter

Subscription filter

UnsubAck

Acknowledgement to unsubscribe

Unsubscribe

Unsubscribe packet

Enums

ClientError

Client Error

ConnectReturnCode

Return code in connack

ConnectionError

Critical errors during eventloop polling

Error

Error during serialization and deserialization

Event

Events which can be yielded by the event loop

Key

Key type for TLS authentication

Outgoing

Current outgoing activity on the eventloop

Packet

Encapsulates all MQTT packet types

PacketType

MQTT packet type

Protocol

Protocol type

QoS

Quality of service

Request

Requests by the client to mqtt event loop. Request are handled one by one.

RetainForwardRule
StateError

Errors during state handling

SubscribeReasonCode
TlsConfiguration
Transport
TrySendError

An error returned from Sender::try_send().

Functions

certs

Extract all the certificates from rd, and return a vec of key::Certificates containing the der-format contents.

check

Checks if the stream has enough bytes to frame a packet and returns fixed header only if a packet can be framed with existing bytes in the stream. The passed stream doesn't modify parent stream's cursor. If this function returned an error, next check on the same parent stream is forced start with cursor at 0 again (Iter is owned. Only Iter's cursor is changed internally)

has_wildcards

Checks if a topic or topic filter has wildcards

matches

Checks if topic matches a filter. topic and filter validation isn't done here.

pkcs8_private_keys

Extract all PKCS8-encoded private keys from rd, and return a vec of key::PrivateKeys containing the der-format contents.

qos

Maps a number to QoS

read

Reads a stream of bytes and extracts next MQTT packet out of it

rsa_private_keys

Extract all RSA private keys from rd, and return a vec of key::PrivateKeys containing the der-format contents.

valid_filter

Checks if the filter is valid

valid_topic

Checks if a topic is valid

Type Definitions

Incoming